#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
#include <unistd.h>
#include <getopt.h>

#include "cjson.h"
#include "zlog.h"
#include "sqlite3.h"

#include "ds18b20.h"
#include "database.h"
#include "mqtt_paser.h"


#define KEEP_ALIVE 60
#define MSG_MAX_SIZE  512

int           g_running= 1;
mqtt_ctx_t    mqtt_ctx;

void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message);
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result);
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos);

/* set the signal */
void signal_handler(int signum)
{
    if(signum == SIGINT)
    {
        g_running = 0;

    }
}

void print_usage(char *program)
{
    printf("%s usage",program);
    printf("-c(--path): specify conf path\n ");
    printf("-h(--help): print this help information\n ");

}


int main(int argc, char **argv)
{
    struct mosquitto       *mosq = NULL; //create mosquitto pointer;
    double                  temp; // acquired temperture
    int                     temp_flag = 0; //get the temperture marker
    char                    buff[1024]; //buffer
    int                     loop; 
    int                     rv;
    int                     ch;
    char                    ini_path[64];

    int                     i; //recored the data retrieved from the database
    int                     row; //the number of rows int the database
    int                     rowid;
    sqlite3                 *db = NULL;

    int                     tim;
    time_t                  tm;
    time_t                  last_time = 0;
    time_t                  curr_time;


    struct option opts[] = {
        {"path", required_argument, NULL, 'c'},
        {"help", no_argument, NULL,'h'},
        {NULL, 0, NULL, 0}
    };

    /* command line argument parsing*/
    while((ch=getopt_long(argc, argv, "c:h", opts, NULL)) != -1)
    {
        switch(ch)
        {
            case 'c':
                strncpy(ini_path,optarg,sizeof(ini_path));
                break;

            case 'h':
                print_usage(argv[0]);
                return 0;
        }
    }

    if(NULL == ini_path)
    {
        print_usage(argv[0]);
        return 0;
    }

    /* parse configuration file data */
    rv = ini_parser(&mqtt_ctx,ini_path);
    if(rv < 0)
    {
        rv = -2;
        goto cleanup;
    }
    /*  init zlog */
    rv = dzlog_init("./etc/zlog.conf","class");
    if(rv) 
    {   
        printf("zlog_init false\n");
        zlog_fini();
        return -1; 
    }
    dzlog_notice("zlog init successfully\n");

    /* open database */
    rv = open_database(&db,mqtt_ctx.common_conf.dbfile);
    if(rv < 0)
    {
        rv = -2;
        goto cleanup;
    }

    // ctrl+c,program terminates
    signal(SIGINT,signal_handler);
    // kill,program terminates
    signal(SIGTERM,signal_handler);

    //mosquitto initialize
    mosquitto_lib_init(); 

    /* create a new mosquitto  */
    mosq = mosquitto_new(mqtt_ctx.mqtt_conf.client_id,true,NULL);
    if(!mosq) 
    {
        dzlog_error("create client failed..\n");
        rv = -3;
        goto cleanup;
    }


    /*set mosquitto user name and user password  */
    if( mosquitto_username_pw_set(mosq,mqtt_ctx.mqtt_conf.usrname,mqtt_ctx.mqtt_conf.password) !=MOSQ_ERR_SUCCESS)
    {
        dzlog_error("mosquitto username and passwd failure\n");
        rv = -4;
        goto cleanup;
    }


    /*Call after successful connection*/
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    /* Call after receiving subscription message  */
    mosquitto_message_callback_set(mosq, my_message_callback);
    /*Called when the agent responds to a subscription request  */
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

    /* connect broker */
    if(mosquitto_connect(mosq, mqtt_ctx.mqtt_conf.host,mqtt_ctx.mqtt_conf.port, KEEP_ALIVE))
    {
        dzlog_error("Unable to connect.\n");
        rv = -5;
        goto cleanup;
    }


    /*By creating new threads and constantly calling the mosquitto_loop() function */
    loop = mosquitto_loop_start(mosq);
    if(loop != MOSQ_ERR_SUCCESS)
    {
        dzlog_error("mosquitto loop error\n");
        rv = -6;
        goto cleanup;
    }

    while(g_running)
    {
        /* get the current time */
        time(&curr_time);

        temp_flag = 0;

        /*temperture is collected every five seconds*/
        if((curr_time - last_time) > mqtt_ctx.mqtt_conf.interval)
        {
            ds18b20_get_temperature(&temp);
            time(&tm);
            memset(buff, 0, sizeof(buff));
            /*data packages*/
            pack_json(buff,temp,mqtt_ctx.common_conf.devid,tm);
            dzlog_notice("buff: %s will be seted\n",buff);
            temp_flag = 1;

            last_time =curr_time;

        }

        /* if the temperture is collected,execute */
        if(temp_flag)
        {
            /* publish*/
            if((mosquitto_publish(mosq,NULL,mqtt_ctx.mqtt_conf.topic_pub, strlen(buff), buff,mqtt_ctx.mqtt_conf.pub_qos,false)) != MOSQ_ERR_SUCCESS )
            {
                dzlog_error("publish failure : %s!\n",strerror(errno));
                /* publish failure,insert data to database */
                insert_data(db,tm,mqtt_ctx.common_conf.devid,temp);

            }
            dzlog_notice("publish successfully!\n");

        }

        sleep(3);
        /* publish data in the database */
        row = get_table(db);
        if(row <= 0)
        {
            dzlog_notice("There is no data in the database!\n");
            sqlite3_close(db);
            continue;
        }
        else
        {
            dzlog_notice("There have %d row data in the database!\n",row);
        }

        /* only send less than 10 pieces of data */
        row = row<10 ? row:10;
        /* reset i to 0 every time */
        i = 0;

        while(i < row)
        {
            /* get data from the database */
            get_data(db,mqtt_ctx.common_conf.devid, &tim, &temp, &rowid, sizeof(mqtt_ctx.common_conf.devid));
            tm = (time_t)tim;
            pack_json(buff,temp,mqtt_ctx.common_conf.devid,tm);
            dzlog_notice("buff: %s in database will be seted\n",buff);
            if((mosquitto_publish(mosq,NULL,mqtt_ctx.mqtt_conf.topic_pub, strlen(buff), buff,0,0)) != MOSQ_ERR_SUCCESS )
            {
                dzlog_error("publish failure : %s!\n",strerror(errno));

            }
            else
            {

                dzlog_notice("publish data in database successfully!\n");
                delete_data(db,rowid);
            }

            i++;

        }



    }
cleanup:
    /* free mosquitto */
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return rv;
}


/*  */
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{

    if(message->payloadlen)
    {
        dzlog_info("%s:   %s \n", message->topic, (char *)message->payload);
    }
    else
    {
        dzlog_info("%s (null)\n", message->topic);
    }

}

/*  */
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    int succeed_sub;
    if(!result)
    {
        /*  Subscribe to broker information topics on successful connect. */
       succeed_sub = mosquitto_subscribe(mosq, NULL, mqtt_ctx.mqtt_conf.topic_sub,mqtt_ctx.mqtt_conf.sub_qos);
       if(succeed_sub != MOSQ_ERR_SUCCESS)
       {
           dzlog_error("Subscribed failed\n");
       }
    }
    else
    {
        dzlog_error( "Connect failed\n");
    }

}


void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;
    dzlog_notice("Subscribed:");
    for(i=1; i<qos_count; i++)
    {

        dzlog_notice(", %d", granted_qos[i]);
    }
}

